package Streaming

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.streaming.Seconds

object quickstarDemo {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder.master("local[2]")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()
    spark.sqlContext.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "192.168.163.8")
      .option("port", 9999)
      .load()

    // Split the lines into words
    val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))


    // Generate running word count
    val wordCounts: DataFrame = words.groupBy("value").count()

    val query: StreamingQuery = wordCounts.writeStream
      .outputMode("complete")
      .format("console").trigger(Trigger.ProcessingTime(3000))
      .start()

    query.awaitTermination()
  }

}
